{
  "cells": [
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Product Recommendation with Feathr on Azure\n",
        "\n",
        "This notebook demonstrates how Feathr Feature Store can simplify and empower your model training and inference using [Azure Synapse](https://azure.microsoft.com/en-us/products/synapse-analytics/). With this notebook, you will learn the followings:\n",
        "\n",
        "1. Define sharable features using Feathr API\n",
        "2. Register features with register API\n",
        "3. Create a training dataset via point-in-time feature join with Feathr API\n",
        "4. Materialize features to online store and then retrieve them with Feathr API\n",
        "\n",
        "In this tutorial, we use Feathr Feature Store to create a model that predicts users' product ratings for an e-commerce website. The main purpose of this notebook is to demonstrate the capabilities running on Azure Synapse and thus we simplified the problem to just predict the ratings for a single product. An advanced example can be found [here](../product_recommendation_demo_advanced.ipynb)."
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 1. Prerequisites\n",
        "\n",
        "### Use Azure Resource Manager (ARM) to Provision Azure Resources for Feathr\n",
        "\n",
        "First step is to provision required cloud resources if you want to use Feathr. Feathr provides a python based client to interact with cloud resources. Please follow the steps [here](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html) to provision required cloud resources. This will create a new resource group and deploy the needed Azure resources in it. \n",
        "\n",
        "If you already have an existing resource group and only want to install few resources manually you can refer to the cli documentation [here](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-cli.html). It provides CLI commands to install the needed resources.\n",
        "\n",
        "> Please Note: CLI documentation is for advance users since there are lot of configurations and role assignment that would have to be done manually. Therefore, ARM template is the preferred way to deploy.\n",
        "\n",
        "The below architecture diagram represents how different resources interact with each other.\n",
        "![Architecture](https://github.com/feathr-ai/feathr/blob/main/docs/images/architecture.png?raw=true)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Set the required permissions\n",
        "\n",
        "Before you proceed further, you would need additional permissions to:\n",
        "* Access the keyvault,\n",
        "* Access the Storage Blob as a Contributor, and\n",
        "* Submit jobs to Synapse cluster.\n",
        "\n",
        "Run the following commands in the [Cloud Shell](https://shell.azure.com) before moving forward. Please replace `YOUR_RESOURCE_PREFIX` with the value you used in ARM template deployment.\n",
        "\n",
        "```\n",
        "    resource_prefix=\"YOUR_RESOURCE_PREFIX\"\n",
        "    synapse_workspace_name=\"${resource_prefix}syws\"\n",
        "    keyvault_name=\"${resource_prefix}kv\"\n",
        "    objectId=$(az ad signed-in-user show --query id -o tsv)\n",
        "    az keyvault update --name $keyvault_name --enable-rbac-authorization false\n",
        "    az keyvault set-policy -n $keyvault_name --secret-permissions get list --object-id $objectId\n",
        "    az role assignment create --assignee $objectId --role \"Storage Blob Data Contributor\"\n",
        "    az synapse role assignment create --workspace-name $synapse_workspace_name --role \"Synapse Contributor\" --assignee $objectId\n",
        "```\n"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Install Feathr python package and it's dependencies\n",
        "\n",
        "Here, we install the package from the repository's main branch. To use the latest release, you may run `pip install feathr[notebook]` instead. If so, however, some of the new features in this notebook might not work."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%pip install \"git+https://github.com/feathr-ai/feathr.git#subdirectory=feathr_project&egg=feathr[notebook]\""
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 2. Config Feathr Client"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import glob\n",
        "import os\n",
        "import tempfile\n",
        "from datetime import datetime, timedelta\n",
        "from math import sqrt\n",
        "\n",
        "from azure.identity import AzureCliCredential\n",
        "    \n",
        "import pandas as pd\n",
        "from pyspark.sql import DataFrame\n",
        "\n",
        "import feathr\n",
        "from feathr import (\n",
        "    FeathrClient,\n",
        "    BOOLEAN, FLOAT, INT32, ValueType,\n",
        "    Feature, DerivedFeature, FeatureAnchor,\n",
        "    BackfillTime, MaterializationSettings,\n",
        "    FeatureQuery, ObservationSettings,\n",
        "    RedisSink,\n",
        "    INPUT_CONTEXT, HdfsSource,\n",
        "    WindowAggTransformation,\n",
        "    TypedKey,\n",
        ")\n",
        "from feathr.datasets.constants import (\n",
        "    PRODUCT_RECOMMENDATION_USER_OBSERVATION_URL,\n",
        "    PRODUCT_RECOMMENDATION_USER_PROFILE_URL,\n",
        "    PRODUCT_RECOMMENDATION_USER_PURCHASE_HISTORY_URL,\n",
        ")\n",
        "from feathr.datasets.utils import maybe_download\n",
        "from feathr.utils.config import generate_config\n",
        "from feathr.utils.job_utils import get_result_df\n",
        "\n",
        "\n",
        "print(f\"Feathr version: {feathr.__version__}\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# TODO fill the following values\n",
        "RESOURCE_PREFIX = None           # The prefix value used at the ARM deployment step\n",
        "AZURE_SYNAPSE_SPARK_POOL = None  # Set Azure Synapse Spark pool name\n",
        "ADLS_KEY = None                  # Set Azure Data Lake Storage key to use Azure Synapse\n",
        "\n",
        "PROJECT_NAME = \"product_recommendation_synapse_demo\"\n",
        "SPARK_CLUSTER = \"azure_synapse\"\n",
        "\n",
        "# TODO if you deployed resources manually using different names, you'll need to change the following values accordingly: \n",
        "ADLS_ACCOUNT = f\"{RESOURCE_PREFIX}dls\"\n",
        "ADLS_FS_NAME = f\"{RESOURCE_PREFIX}fs\"\n",
        "AZURE_SYNAPSE_URL = f\"https://{RESOURCE_PREFIX}syws.dev.azuresynapse.net\"  # Set Azure Synapse workspace url to use Azure Synapse\n",
        "KEY_VAULT_URI = f\"https://{RESOURCE_PREFIX}kv.vault.azure.net\"\n",
        "REDIS_HOST = f\"{RESOURCE_PREFIX}redis.redis.cache.windows.net\"\n",
        "REGISTRY_ENDPOINT = f\"https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1\"\n",
        "AZURE_SYNAPSE_WORKING_DIR = f\"abfss://{ADLS_FS_NAME}@{ADLS_ACCOUNT}.dfs.core.windows.net/{PROJECT_NAME}\"\n"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Get Azure CLI credential to access Azure resources\n",
        "\n",
        "Login to Azure with a device code (You will see instructions in the output once you execute the cell):"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "!az login --use-device-code"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "credential = AzureCliCredential(additionally_allowed_tenants=['*'])"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Set the environment variables\n",
        "Set the environment variables that will be used by Feathr as configuration. Feathr supports configuration via enviroment variables and yaml. You can read more about it [here](https://feathr-ai.github.io/feathr/how-to-guides/feathr-configuration-and-env.html)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "if \"ADLS_KEY\" not in os.environ and ADLS_KEY:\n",
        "    os.environ[\"ADLS_KEY\"] = ADLS_KEY"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "if \"REDIS_PASSWORD\" not in os.environ:\n",
        "    from azure.keyvault.secrets import SecretClient\n",
        "\n",
        "    secret_client = SecretClient(vault_url=KEY_VAULT_URI, credential=credential)\n",
        "    retrieved_secret = secret_client.get_secret('FEATHR-ONLINE-STORE-CONN').value\n",
        "    os.environ['REDIS_PASSWORD'] = retrieved_secret.split(\",\")[1].split(\"password=\", 1)[1]"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "> If you run into issues where Key vault or other resources are not found through notebook despite being there, make sure you are connected to the right subscription by running the command: 'az account show' and 'az account set --subscription <subscription_id>'"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Generate the Feathr client configuration\n",
        "\n",
        "The code below will write the onfiguration to a temporary location that will be used by a Feathr client. Please refer to [feathr_config.yaml](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for full list of configuration options and details."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "config_path = generate_config(\n",
        "    resource_prefix=RESOURCE_PREFIX,\n",
        "    project_name=PROJECT_NAME,\n",
        "    online_store__redis__host=REDIS_HOST,\n",
        "    feature_registry__api_endpoint=REGISTRY_ENDPOINT,\n",
        "    spark_config__spark_cluster=SPARK_CLUSTER,\n",
        "    spark_config__azure_synapse__dev_url=AZURE_SYNAPSE_URL,\n",
        "    spark_config__azure_synapse__pool_name=AZURE_SYNAPSE_SPARK_POOL,\n",
        "    spark_config__azure_synapse__workspace_dir=AZURE_SYNAPSE_WORKING_DIR,\n",
        ")\n",
        "\n",
        "with open(config_path, 'r') as f: \n",
        "    print(f.read())"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Initialize Feathr Client"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "client = FeathrClient(config_path=config_path, credential=credential)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 3. Define Sharable Features using Feathr API\n",
        "\n",
        "### Understand raw datasets\n",
        "We have three datasets to work with:\n",
        "* Observation dataset (a.k.a. labeled dataset)\n",
        "* User profile\n",
        "* User purchase history"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Upload datasets into ADLS so that Syanpse job can access them\n",
        "user_observation_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(\n",
        "    PRODUCT_RECOMMENDATION_USER_OBSERVATION_URL\n",
        ")\n",
        "user_profile_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(\n",
        "    PRODUCT_RECOMMENDATION_USER_PROFILE_URL\n",
        ")\n",
        "user_purchase_history_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(\n",
        "    PRODUCT_RECOMMENDATION_USER_PURCHASE_HISTORY_URL\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Observation dataset\n",
        "# Observation dataset usually comes with a event_timestamp to denote when the observation happened.\n",
        "# The label here is product_rating. Our model objective is to predict a user's rating for this product.\n",
        "pd.read_csv(user_observation_source_path).head()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# User profile dataset\n",
        "# Used to generate user features\n",
        "pd.read_csv(user_profile_source_path).head()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# User purchase history dataset.\n",
        "# Used to generate user features. This is activity type data, so we need to use aggregation to genearte features.\n",
        "pd.read_csv(user_purchase_history_source_path).head()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        " After a bit of data exploration, we want to create a training dataset like this:\n",
        "\n",
        " \n",
        "![Feature Flow](https://github.com/feathr-ai/feathr/blob/main/docs/images/product_recommendation.jpg?raw=true)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### What's a feature in Feathr\n",
        "A feature is an individual measurable property or characteristic of a phenomenon which is sometimes time-sensitive. \n",
        "\n",
        "In Feathr, a feature is defined by the following characteristics:\n",
        "* The typed key (a.k.a. entity id): identifies the subject of feature, e.g. a user id of 123, a product id of SKU234456.\n",
        "* The feature name: the unique identifier of the feature, e.g. user_age, total_spending_in_30_days.\n",
        "* The feature value: the actual value of that aspect at a particular time, e.g. the feature value of the person's age is 30 at year 2022.\n",
        "* The timestamp: this indicates when the event happened. For example, the user purchased certain product on a certain timestamp. This is usually used for point-in-time join.\n",
        "\n",
        "You can feel that this is defined from a feature consumer (a person who wants to use a feature) perspective. It only tells us what a feature is like. In later sections, you can see how a feature consumer can access the features in a very simple way.\n",
        "\n",
        "To define how to produce the feature, we need to specify:\n",
        "* Feature source: what source data that this feature is based on\n",
        "* Transformation: what transformation is used to transform the source data into feature. Transformation can be optional when you just want to take a column out from the source data.\n",
        "\n",
        "(For more details on feature definition, please refer to the [Feathr Feature Definition Guide](https://feathr-ai.github.io/feathr/concepts/feature-definition.html).)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define a feature source with preprocessing function\n",
        "A [feature source](https://feathr.readthedocs.io/en/latest/#feathr.Source) defines where to find the source data and how to use the source data for the upcoming feature transformation. There are different types of feature sources that you can use. `HdfsSource` is the most commonly used one that can connect you to data lake, Snowflake database tables etc. It's similar to database connector.\n",
        "\n",
        "We define `HdfsSource` with following arguments:\n",
        "* `name`: It's used for you to recognize it. It has to be unique among all other feature source. \n",
        "* `path`: It points to the source data.\n",
        "* `preprocessing` (optional): Data preprocessing UDF (user defined function). The function will be applied to the data before all the feature transformations based on this source.\n",
        "* `event_timestamp_column` (optional): there are `event_timestamp_column` and `timestamp_format` used for point-in-time join (we will cover them later).\n",
        "\n",
        "See [the python API documentation](https://feathr.readthedocs.io/en/latest/#feathr.HdfsSource) to get the details of each input fields. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def feathr_udf_preprocessing(df: DataFrame) -> DataFrame:\n",
        "    from pyspark.sql.functions import col\n",
        "\n",
        "    return df.withColumn(\"tax_rate_decimal\", col(\"tax_rate\") / 100)\n",
        "\n",
        "\n",
        "batch_source = HdfsSource(\n",
        "    name=\"userProfileData\",\n",
        "    path=user_profile_source_path,\n",
        "    preprocessing=feathr_udf_preprocessing,\n",
        ")"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define features\n",
        "\n",
        "To define features on top of the `HdfsSource`, we need to:\n",
        "1. Specify the key of this feature: feature are like other data, they are keyed by some id. For example, user_id, product_id. You can also define compound keys.\n",
        "2. Specify the name of the feature via `name` parameter and how to transform it from source data via `transform` parameter. Also some other metadata, like `feature_type`.\n",
        "3. Group them together so we know it's from one `HdfsSource` via `FeatureAnchor`. Also give it a unique name via `name` parameter so we can recognize it.\n",
        "\n",
        "It's called **Feature Anchor** since this group of features are anchored to the source. There are other types of features that are computed on top of other features, called `DerivedFeatures` (we will cover this in the next section)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "user_id = TypedKey(\n",
        "    key_column=\"user_id\",\n",
        "    key_column_type=ValueType.INT32,\n",
        "    description=\"user id\",\n",
        "    full_name=\"product_recommendation.user_id\",\n",
        ")\n",
        "\n",
        "feature_user_age = Feature(\n",
        "    name=\"feature_user_age\",\n",
        "    key=user_id,\n",
        "    feature_type=INT32,\n",
        "    transform=\"age\",\n",
        ")\n",
        "\n",
        "feature_user_tax_rate = Feature(\n",
        "    name=\"feature_user_tax_rate\",\n",
        "    key=user_id,\n",
        "    feature_type=FLOAT,\n",
        "    transform=\"tax_rate_decimal\",\n",
        ")\n",
        "\n",
        "feature_user_gift_card_balance = Feature(\n",
        "    name=\"feature_user_gift_card_balance\",\n",
        "    key=user_id,\n",
        "    feature_type=FLOAT,\n",
        "    transform=\"gift_card_balance\",\n",
        ")\n",
        "\n",
        "feature_user_has_valid_credit_card = Feature(\n",
        "    name=\"feature_user_has_valid_credit_card\",\n",
        "    key=user_id,\n",
        "    feature_type=BOOLEAN,\n",
        "    transform=\"number_of_credit_cards > 0\",\n",
        ")\n",
        "\n",
        "features = [\n",
        "    feature_user_age,\n",
        "    feature_user_tax_rate,\n",
        "    feature_user_gift_card_balance,\n",
        "    feature_user_has_valid_credit_card,\n",
        "]\n",
        "\n",
        "user_feature_anchor = FeatureAnchor(\n",
        "    name=\"anchored_features\", source=batch_source, features=features\n",
        ")"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define window aggregation features\n",
        "\n",
        "[Window aggregation](https://en.wikipedia.org/wiki/Window_function_%28SQL%29) helps us to create more powerful features by compressing large amount of information. For example, we can compute *average purchase amount over the last 90 days* from the purchase history to capture user's recent consumption trend.\n",
        "\n",
        "To create window aggregation features, we define `WindowAggTransformation` with following arguments:\n",
        "1. `agg_expr`: the field/column you want to aggregate. It can be an ANSI SQL expression, e.g. `cast_float(purchase_amount)` to cast `str` type values to `float`.\n",
        "2. `agg_func`: the aggregation function, e.g. `AVG`. See below table for the full list of supported functions.\n",
        "3. `window`: the aggregation window size, e.g. `90d` to aggregate over the 90 days.\n",
        "\n",
        "| Aggregation Type | Input Type | Description |\n",
        "| --- | --- | --- |\n",
        "| `SUM`, `COUNT`, `MAX`, `MIN`, `AVG` | Numeric | Applies the the numerical operation on the numeric inputs. |\n",
        "| `MAX_POOLING`, `MIN_POOLING`, `AVG_POOLING`\t| Numeric Vector | Applies the max/min/avg operation on a per entry basis for a given a collection of numbers. |\n",
        "| `LATEST` | Any | Returns the latest not-null values from within the defined time window. |\n",
        "\n",
        "\n",
        "\n",
        "After you have defined features and sources, bring them together to build an anchor:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "purchase_history_data = HdfsSource(\n",
        "    name=\"purchase_history_data\",\n",
        "    path=user_purchase_history_source_path,\n",
        "    event_timestamp_column=\"purchase_date\",\n",
        "    timestamp_format=\"yyyy-MM-dd\",\n",
        ")\n",
        "\n",
        "agg_features = [\n",
        "    Feature(\n",
        "        name=\"feature_user_avg_purchase_for_90days\",\n",
        "        key=user_id,\n",
        "        feature_type=FLOAT,\n",
        "        transform=WindowAggTransformation(\n",
        "            agg_expr=\"cast_float(purchase_amount)\", agg_func=\"AVG\", window=\"90d\"\n",
        "        ),\n",
        "    )\n",
        "]\n",
        "\n",
        "user_agg_feature_anchor = FeatureAnchor(\n",
        "    name=\"aggregationFeatures\", source=purchase_history_data, features=agg_features\n",
        ")"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define derived features\n",
        "Derived features are the features that are computed from anchored features or other derived features.\n",
        "Typical use cases include feature cross (f1 * f2) or cosine similarity between two features."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "derived_features = [\n",
        "    DerivedFeature(\n",
        "        name=\"feature_user_purchasing_power\",\n",
        "        key=user_id,\n",
        "        feature_type=FLOAT,\n",
        "        input_features=[feature_user_gift_card_balance, feature_user_has_valid_credit_card],\n",
        "        transform=\"feature_user_gift_card_balance + if(boolean(feature_user_has_valid_credit_card), 100, 0)\",\n",
        "    )\n",
        "]"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Build features\n",
        "Lastly, we need to build these features so that they can be consumed later."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "client.build_features(\n",
        "    anchor_list=[user_agg_feature_anchor, user_feature_anchor],\n",
        "    derived_feature_list=derived_features,\n",
        ")"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Extra topic: Passing-through features\n",
        "\n",
        "Some features could be computed from the observation data directly at runtime and thus will not require an entity key or timestamp for joining. For example, *the day of the week of the request*. We can define such features by passing `source=INPUT_CONTEXT` to the anchor. Details about the passing through features can be found from [here](https://feathr-ai.github.io/feathr/concepts/feathr-concepts-for-beginners.html#motivation-on-input_context)."
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 4. Create Training Data using Point-in-Time Correct Feature Join\n",
        "\n",
        "To create a training dataset using Feathr, we need to provide a **feature join settings** to specify what features and how these features should be joined to the observation data. \n",
        "\n",
        "To learn more on this topic, please refer to [Point-in-time Correctness document](https://feathr-ai.github.io/feathr/concepts/point-in-time-join.html)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "user_feature_query = FeatureQuery(\n",
        "    feature_list=[feat.name for feat in features + agg_features + derived_features],\n",
        "    key=user_id,\n",
        ")\n",
        "\n",
        "settings = ObservationSettings(\n",
        "    observation_path=user_observation_source_path,\n",
        "    event_timestamp_column=\"event_timestamp\",\n",
        "    timestamp_format=\"yyyy-MM-dd\",\n",
        ")\n",
        "\n",
        "client.get_offline_features(\n",
        "    observation_settings=settings,\n",
        "    feature_query=[user_feature_query],\n",
        "    output_path=user_profile_source_path.rpartition(\"/\")[0] + f\"/product_recommendation_features.avro\",\n",
        ")\n",
        "\n",
        "client.wait_job_to_finish(timeout_sec=5000)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Let's use the helper function `get_result_df` to download the result and view it:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "res_df = get_result_df(client)\n",
        "res_df.head()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Train a machine learning model\n",
        "After getting all the features, let's train a machine learning model with the converted feature by Feathr:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from sklearn.ensemble import GradientBoostingRegressor\n",
        "from sklearn.metrics import mean_squared_error\n",
        "from sklearn.model_selection import train_test_split\n",
        "\n",
        "\n",
        "# Fill None values with 0\n",
        "final_df = (\n",
        "    res_df\n",
        "    .drop([\"event_timestamp\"], axis=1, errors=\"ignore\")\n",
        "    .fillna(0)\n",
        ")\n",
        "\n",
        "# Split data into train and test\n",
        "X_train, X_test, y_train, y_test = train_test_split(\n",
        "    final_df.drop([\"product_rating\"], axis=1),\n",
        "    final_df[\"product_rating\"].astype(\"float64\"),\n",
        "    test_size=0.2,\n",
        "    random_state=42,\n",
        ")\n",
        "\n",
        "# Train a prediction model\n",
        "model = GradientBoostingRegressor()\n",
        "model.fit(X_train, y_train)\n",
        "\n",
        "# Predict and evaluate\n",
        "y_pred = model.predict(X_test)\n",
        "rmse = sqrt(mean_squared_error(y_test.values.flatten(), y_pred))\n",
        "\n",
        "print(f\"Root mean squared error: {rmse}\")"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 5. Feature Materialization\n",
        "\n",
        "In the previous section, we demonstrated how Feathr can compute feature value to generate training dataset from feature definition on-they-fly. Now let's talk about how we can use the trained models.\n",
        "\n",
        "We can use the trained models for both online and offline inference. In both cases, we need features to be fed into the models. For offline inference, you can compute and get the features on-demand; or you can store the computed features to some offline database for later offline inference.\n",
        "\n",
        "For online inference, we can use Feathr to compute and store the features in the online database. Then use it for online inference when the request comes.\n",
        "\n",
        "![img](../../images/online_inference.jpg)\n",
        "\n",
        "In this section, we will focus on materialize features to online store. For materialization to offline store, you can check out our [user guide](https://feathr-ai.github.io/feathr/concepts/materializing-features.html#materializing-features-to-offline-store).\n",
        "\n",
        "\n",
        "### Materialize feature values to online store\n",
        "We can push the computed features to the online store (Redis) like below:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Materialize user features\n",
        "# Note, you can only materialize features of same entity key into one table.\n",
        "redisSink = RedisSink(table_name=\"user_features\")\n",
        "settings = MaterializationSettings(\n",
        "    name=\"user_feature_setting\",\n",
        "    sinks=[redisSink],\n",
        "    feature_names=[\"feature_user_age\", \"feature_user_gift_card_balance\"],\n",
        ")\n",
        "\n",
        "client.materialize_features(settings=settings, allow_materialize_non_agg_feature=True)\n",
        "client.wait_job_to_finish(timeout_sec=5000)"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Fetch feature values from online store\n",
        "We can then get the features from the online store via the client's `get_online_features` or `multi_get_online_features` API."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "client.get_online_features(\n",
        "    \"user_features\", \"2\", [\"feature_user_age\", \"feature_user_gift_card_balance\"]\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "client.multi_get_online_features(\n",
        "    \"user_features\", [\"1\", \"2\"], [\"feature_user_age\", \"feature_user_gift_card_balance\"]\n",
        ")"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 6. Feature Registration\n",
        "Lastly, we can also register the features and share them across teams:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "try:\n",
        "    client.register_features()\n",
        "except Exception as e:\n",
        "    print(e)\n",
        "print(client.list_registered_features(project_name=PROJECT_NAME))"
      ]
    },
    {
      "attachments": {},
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Summary\n",
        "In this notebook you learned how to set up Feathr and use it to create features, register the features and use them for model training and inferencing.\n",
        "\n",
        "We hope this example gave you a good sense of Feathr's capabilities and how you could leverage it within your organization's MLOps workflow."
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.8.5 (default, Jan 27 2021, 15:41:15) \n[GCC 9.3.0]"
    },
    "vscode": {
      "interpreter": {
        "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
      }
    }
  },
  "nbformat": 4,
  "nbformat_minor": 2
}
